Vertex AI Pipelines
コードを実行したらコンパイルして Vertex AI Pipelines に投入する
手元から pipeline を投入するときに
code:run.py
if __name__ == "__main__":
# 邪魔じゃないなら単にこれでも
# dest = __file__.replace(".py", ".yaml")
import os
import tempfile
filename = os.path.basename(__file__).replace(".py", ".yaml")
dest = os.path.join(tempfile.mkdtemp(), filename)
compiler = kfp.compiler.Compiler()
compiler.compile(pipeline_func=pipeline, package_path=dest)
print(f"Generated pipeline: {dest}")
import google.cloud.aiplatform as aip
aip.init(project=project, location=location)
job = aip.PipelineJob(
display_name="sample",
template_path=dest,
)
job.submit(service_account="...")
マシンスペックを指定する
pipeline 側で component ごとに設定、PipelineTask に対する set_ 系メソッドで設定する (TaskConfiguration)
foo_task().set_env_variable("FOO", "bar") 的なやつ
code:task_configurations.py
.add_accelerator_type
.set_accelerator_limit
.set_cpu_limit
.set_memory_request # Vertex AI Pipelines はサポートしてない
.set_memory_limit
.set_env_variable
.set_caching_options
.set_display_name
.set_retry
.ignore_upstream_failure
デフォルトは e2-standard-4 (4 cpu 16GB mem)
ただし GPU を付けると n1-highmem-2 (2 cpu 13GB mem) になる、e2 は GPU 付けれないため n1 系になる
使いそうなやつ
e2-standard-4 GPU なし, デフォルト
.set_cpu_limit("4").set_memory_limit("16G")
e2-highmem-2 GPU なし最安 ($0.133257/h in Tokyo) これ以下を指定する意味ない
.set_cpu_limit("2").set_memory_limit("16G")
n1-highmem-2 GPU 可で最安だが ログでないので使わない
.set_cpu_limit("2").set_memory_limit("13G")`
n1-standard-4 GPU 可の実用最安 ($0.280600/h in Tokyo)
.set_cpu_limit("4").set_memory_limit("15G")
n1-highmem-4 GPU 可
.set_cpu_limit("4").set_memory_limit("26G")
n1-standard-8 GPU 可
.set_cpu_limit("8").set_memory_limit("30G")`
GPU 指定
PipelineTask.add_node_selector_constraint() takes 2 positional arguments but 3 were given
あれ、最近の kfp (2.7)は key いらないらしい
print_torch().add_node_selector_constraint("NVIDIA_TESLA_T4").set_accelerator_limit(1)
GPU 付けたら起動までにめちゃめちゃ時間がかかる、10分~ 程度
リージョンやマシンタイプと組み合わせられる GPU に制限がある
観測上 e2-highmem-2 未満のリソースを要求してもそれより小さいインスタンスは割り当てられない
低スペで GPU つけるとログ出ない
もっと目立つところに書いておけ
GPU で n1-highmem-2 などの小型のマシンタイプを使用すると、CPU の制約により、一部のワークロードでロギングが失敗する可能性があります。トレーニング ジョブがログを返さなくなった場合は、より大きなマシンタイプの選択を検討してください。
可能性がありますだけど、経験上 n1-highmem-2 で GPU 乗せるとコンポーネント内のログやエラーが残ったことない、デバッグにめちゃめちゃ苦労するので設定するのがよい
code:py
# n1-highmem-4 ($0.348910/h in Tokyo)
comp1()
.set_cpu_limit("4")
.set_memory_limit("26G")
.add_node_selector_constraint("NVIDIA_TESLA_T4")
.set_accelerator_limit(1)
# n1-standard-4 ($0.280600/h in Tokyo)
comp2()
.set_cpu_limit("4")
.set_memory_limit("15G")
.add_node_selector_constraint("NVIDIA_TESLA_T4")
.set_accelerator_limit(1)
Artifact の扱い
Output[] をコンポーネントの引数で受けたり、Artifactを返すと残る、じゃんじゃか増えていく
metadata をうまくつけておくと検索して使えるので便利
output.path に書き込むと対応する Cloud Storage に書き込まれ、output.uri に gs://... が入ってる
Output[HTML], Output[Markdown] 等はコンソールで内容を表示できる
Artifact の名前を Output 以外にする
Traditional artifact syntax (component の引数で Output[Artifact] を受ける方)は引数名が Vertex ML Metadata 上の名前になる Pythonic artifact syntax で名前を指定する方法はない?
Cloud Storage に書き込む & Metadata 残す
独自型を定義してもよい、追加で出せる内容が増える
output = dsl.Dataset(uri=dsl.get_uri())
output.metadata = {...} でメタデータ残せる
dsl.get_uri(suffix='sample.txt') でファイル名決める
1つのコンポーネントから複数の output を返す時は実質必須、デフォルト suffix="Output" なので同じ書き込み先
output.display_name='...' で表示名
name は Google Cloud リソースの name 参照 (projects/.../locations/... 的な) なので参照時にしか使わない
デフォルトで /gcs/{bucket_name} にアクセスすると読み出せる
Artifact の uri の先頭 gs:// を置換して読んでもよい
input.uri.replace("gs://", "/gcs/")
code:gcs_components.py
@dsl.component(base_image="python:3.11")
def save_artifact() -> dsl.Dataset:
out = dsl.Dataset(uri=dsl.get_uri(suffix='sample.txt')) # dsl.get_uri() で保存先を生成
with open(out.path, "w") as f: # Dataset に対応するローカルパスに書き込み
f.write("sample")
return out # Dataset 返す
@dsl.component(base_image="python:3.11")
def read_artifact(input: dsl.Dataset):
# gs://... が設定されているので GCS FUSE で読めるパスへ
# そのまま Cloud Storage Client などで読んでもよい
path = input.uri.replace("gs://", "/gcs/")
with open(path, "r") as f: # 開いて読む
print(f.readlines())
pass
@dsl.pipeline()
def pipeline():
save_artifact_task = save_artifact()
read_artifact(input=save_artifact_task.output)
Pipeline を Experiment と紐づける
code:experiment_linking.py
job = aip.PipelineJob(
template_path=dest,
parameter_values=parameter_values,
...
)
job.submit(
service_account=service_account,
experiment=experiment, # ここで名前つける
)
メトリックを記録する
コンポーネントで Output[Metrics] を引数に取り log_metrics(key, value) を呼ぶ
pipeline の artifact として残る
Experiment に対しパイプラインが Experiment Run として紐づく
メトリックはパイプライン単位で残る、同じキーで書いてみても一応両方見れるが良くはなさそう
code:battle.py
@dsl.component(base_image="python:3.11")
metrics.log_metric("from", "a")
metrics.log_metric("a", 123)
@dsl.component(base_image="python:3.11")
metrics.log_metric("from", "b")
metrics.log_metric("b", 456)
https://gyazo.com/d88d0107306075ca7191dbd9cfff7ac4 https://gyazo.com/3088f7bcd1ba8d4befe9efbe8836810b
複数のコンポーネントで別々のメトリックを書きたいなら自分で google.cloud.aiplatform 呼ぶしかないかな
kfp は metrics.log_metric(key, value) に対し
ExperimentRun は run.log_metrics({ key: value }) である
run_pipeline と同じ名前なら同じ Experiment に紐づく
パイプライン実行とテスト実行が同じテーブルなの微妙に嫌だけど、カラムあるってことは混ぜて使う想定かねえ
繰り返し実行で名前がかぶるので
被ってたら resume で上書きする
code:resume.py
name = "use-aiplatform-in-component"
resume = aiplatform.ExperimentRun.get(name) is not None
with aiplatform.start_run(name, resume=resume) as run:
run.log_metrics(...)
別々の ID を振る
task_id=dsl.PIPELINE_TASK_ID_PLACEHOLDER
code:task_id.py
name = f"use-aiplatform-in-component-{task_id}"
with aiplatform.start_run(name) as run:
run.log_metrics(...)
パイプライン実行リージョンと Exeperiment のリージョン別にする
うーん、run_pipeline ではパイプライン実行リージョンを指定し、expeirment は指定しない(してもいいが)
kfp の metrics は使わず aiplatform で自分で送る、かなあ
GCS FUSE
pipeline_root と関係なく gs://BUCKET_NAME/... にアクセスして権限があれば読み書きできる
仮想的に世界の全ての bucket がマウントされているような雰囲気、アクセスした時に権限がなければ弾かれる
impersonate で読み書きしたいが方法はなさそう
Secret Manager 使う
@dsl.component(..., packages_to_install=['google-cloud-secret-manager']) して普通に client 作って参照する
このサンプルの読み出すコンポーネントから受け渡したら Cloud Storage に残って良くないのでは?
pipeline 引数
def pipeline(foo: int, bar: int): ... して
aiplatform.PipelineJob(parameter_values={foo: 123, bar: 456}) のようにジョブ設定で渡す
キャッシュはデフォルト有効
らしいので 明に切る...と思ったけど実際あると便利
切る方法
foo_task(...).set_caching_options(False) コンポーネント単位で無効
aiplatform.PipelineJob(..., enable_caching=False) パイプライン単位で無効(Job投入前の設定)
パイプライン名、入出力、コンポーネントの外形的な仕様がキャッシュキーになる
パイプライン コンポーネントは決定論的に構築する必要があります。特定の入力セットでは、常に同じ出力が生成される必要があります
なのでコンポーネント内の実装いじって実行してもキャッシュのままなことがある
またコンポーネント実行で失敗していてもキャッシュ使い回される? ぽいので、修正しているつもりができてないことがある
コンポーネントとして呼ばれたパイプラインのキャッシュはどうなる?
共有の使い回しパイプライン
パイプラインA → 使い回しパイプライン
パイプラインB → 使い回しパイプライン
経験的にはされてなさそう(ちゃんとみてない)
親となる呼び出し元の名前が変わったらキャッシュは共有されない、まあそうか
キャッシュが効いていても出力するパイプライン実行には Artifact が追加される
助かる挙動
1. パイプライン実行1で Aritfact A を生成
2. パイプラインを修正
3. パイプライン実行2 で
Artifact A の生成はキャッシュが効いて処理をスキップ
Aritfact B を新たに生成
としたときに、A は 実行1, 実行2 両方のコンテキストから引ける、B は 実行2 からのみ
実行サービスアカウント
Failed to create pipeline job. Error: Vertex AI Service Agent ... does not have permission to access Artifact Registry repository
デフォルトでは Vertex AI Service Agent のサービスアカウントで動作する?
なのでカスタムイメージ pull するのに Artifact Registry の権限必要?
Artifact Registry からイメージを pull するようにカスタム サービス アカウントを構成することはできません。Vertex AI は、デフォルトのサービス アカウントを使用してイメージを pull します。
このデフォルトは Service Agent なのか、Compute Default なのかどっちだ
大抵実行用サービスアカウントを作って使うだろうし
job.submit(service_account=...) で指定したユーザの権限で ArtifactRegistry pull している
ml.googleapis.com/ prefix のもの
Cloud ML Job
失敗したコンポーネントが1つあれば全体を失敗させる
フェイルファストにする
aiplatform.PipelineJob(..., failure_policy='fast') デフォルトは slow
Google Cloud 用コンポーネント
pipeline_root
省略するとデフォルトで以下が root になる
gs://{PROJECT_ID}-vertex-pipelines-{REGION}/output_artifacts/
各パイプラインが実行されるのは
{PIPELINE_ROOT}/{実行PROJECT_NUMBER?}/{PIPELINE_JOB_NAME}
PIPELINE_JOB_NAME は dsl.PIPELINE_JOB_NAME_PLACEHOLDER
既に存在する Artifact を返す
1つの func を別々の base_image を指定したい
コードコピペしたくねえが hermetic に(component の中に一通り)書くので使いまわしづらい...と思いきや
ここは普通の Python としてデコレータを単に呼べば良い
code:call_decorator.py
image_v1 = 'asia-northeast1-docker.pkg.dev/...'
image_v2 = 'asia-northeast1-docker.pkg.dev/...'
def my_component(...) -> dsl.Artifact:
...
foo_v1 = dsl.component(base_image=image_v1, func=my_component)
foo_v2 = dsl.component(base_image=image_v2, func=my_component)
@dsl.pipeline
def pipeline():
foo_v1_task = foo_v1(...)
foo_v2_task = foo_v2(...)
compare(v1=foo_v1_task.output, v2=foo_v2_task.output)